Library Imports

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Template

spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

sc = spark.sparkContext

Initial Datasets

pets = spark.createDataFrame(
    [
        (1, 1, 'Bear'),
        (2, 1, 'Chewie'),
        (3, 2, 'Roger'),
    ], ['id', 'breed_id', 'nickname']
)

pets.toPandas()
id breed_id nickname
0 1 1 Bear
1 2 1 Chewie
2 3 2 Roger
breeds = spark.createDataFrame(
    [
        (1, 'Pitbull', 10), 
        (2, 'Corgie', 20), 
    ], ['breed_id', 'name', 'average_height']
)

breeds.toPandas()
breed_id name average_height
0 1 Pitbull 10
1 2 Corgie 20

Filter Pushdown

Filter pushdown improves performance by reducing the amount of data shuffled during any dataframes transformations.

Depending on your filter logic and where you place your filter code. Your Spark code will behave differently.

Case #1: Filtering on Only One Side of the Join

df = (
    pets
    .join(breeds, 'breed_id', 'left_outer')
    .filter(F.col('nickname') == 'Chewie')
)

df.toPandas()
breed_id id nickname name average_height
0 1 2 Chewie Pitbull 10
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(nickname#2) && (nickname#2 = Chewie))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]

What Happened:

Because the column nickname is only present in the left side of the join, only the left side of the join was filtered before the join.

Case #2: Filter on Both Sides of the Join

df = (
    pets
    .join(breeds, 'breed_id', 'left_outer')
    .filter(F.col('breed_id') == 1)
)

df.toPandas()
breed_id id nickname name average_height
0 1 1 Bear Pitbull 10
1 1 2 Chewie Pitbull 10
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]

What Happened:

The column breed_id is present in both sides of the join, but only the left side was filtered before the join.

Case #3: Filter on Both Sides of the Join #2

df = (
    pets
    .join(breeds, 'breed_id')
    .filter(F.col('breed_id') == 1)
)

df.toPandas()
breed_id id nickname name average_height
0 1 1 Bear Pitbull 10
1 1 2 Chewie Pitbull 10
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- *(5) SortMergeJoin [breed_id#1L], [breed_id#6L], Inner
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
            +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]

What Happened:

The column breed_id is present in both sides of the join, and spark was able to figure out that it should perform a filter on both sides before the join.

Case #4: Filter on Both Sides of the Join, Filter Beforehand

df = (
    pets
    .join(
        breeds.filter(F.col('breed_id') == 1), 
        'breed_id', 
        'left_outer'
    )
    .filter(F.col('breed_id') == 1)
)

df.toPandas()
breed_id id nickname name average_height
0 1 1 Bear Pitbull 10
1 1 2 Chewie Pitbull 10
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
            +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]

What Happened:

The column breed_id is present in both sides of the join, and both sides were filtered before the join.

Summary

  • To improve join performance, we should always try to push the filter before the joins.
  • Spark might be smart enough to figure that the filter can be performed on both sides, but not always.
  • You should alway check to see if your Spark DAG is performant during a join and if any filters can be pushed before the joins.

results matching ""

    No results matching ""